{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## HumanEval and MBPP Code Generation Datasets\n",
    "\n",
    "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LAION-AI/Open-Assistant/blob/main/data/datasets/codet_humaneval_mbpp/HumanEval_and_MBPP_code_gen.ipynb)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook contains code to parse HumanEval and MBPP Python code generation prompt and solution data and modify to `(prompt, solution)` pairs outputted in a `.jsonl` file.\n",
    "\n",
    "Requirements: `requests`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "from pathlib import Path\n",
    "import requests"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "DATA_SOURCES: list[str] = [\"HumanEval\", \"mbpp_sanitized\"]\n",
    "DATA_FILES: list[str] = [f\"{source}_for_code_generation.jsonl\" for source in DATA_SOURCES]\n",
    "OUT_FILES: list[str] = [f\"{source}_codegen_qa.jsonl\" for source in DATA_SOURCES]\n",
    "\n",
    "IN_DIR = Path(\"data\")\n",
    "OUT_DIR = Path(\"data/qa\")\n",
    "OUT_DIR.mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "FILE_PATHS: list[Path] = [IN_DIR / data_file for data_file in DATA_FILES]\n",
    "OUT_PATHS: list[Path] = [OUT_DIR / out_file for out_file in OUT_FILES]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def download_file(filename: str):\n",
    "    url = f\"https://raw.githubusercontent.com/microsoft/CodeT/main/CodeT/data/dataset/{filename}\"\n",
    "    response = requests.get(url)\n",
    "    with open(f\"data/{filename}\", \"wb\") as f:\n",
    "        f.write(response.content)\n",
    "\n",
    "\n",
    "for filename in DATA_FILES:\n",
    "    download_file(filename)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can find the docstring, use its contents as the instruction (prefixed with a prompt) and then use the content prior to the docstring and the canonical solution as the response."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_docstring_indices(prompt_lines: list[str]) -> tuple[int, int]:\n",
    "    docstring_start, docstring_end = None, None\n",
    "\n",
    "    for i, line in enumerate(prompt_lines):\n",
    "        if not (line.strip().startswith('\"\"\"') or line.strip().startswith(\"'''\")):\n",
    "            continue\n",
    "        if docstring_start:\n",
    "            docstring_end = i\n",
    "            break\n",
    "        docstring_start = i\n",
    "\n",
    "    if docstring_end:\n",
    "        return docstring_start, docstring_end\n",
    "    raise ValueError(f\"No complete docstring found!\\n{prompt_lines}\")\n",
    "\n",
    "\n",
    "def get_before(prompt_lines: list[str], before: int) -> list[str]:\n",
    "    before_lines = prompt_lines[:before]\n",
    "    return before_lines\n",
    "\n",
    "\n",
    "def get_between(prompt_lines: list[str], start: int, end: int) -> list[str]:\n",
    "    between_lines = prompt_lines[start:end]\n",
    "    return between_lines"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_request_and_solution(sample: dict) -> tuple[list[str], list[str]]:\n",
    "    prompt = sample[\"prompt\"]\n",
    "    prompt_lines = prompt.splitlines()\n",
    "\n",
    "    docstring_start, docstring_end = get_docstring_indices(prompt_lines)\n",
    "\n",
    "    # Extract prompt\n",
    "    in_docstring = get_between(prompt_lines, docstring_start, docstring_end)\n",
    "    if '\"\"\"' in in_docstring[0] or \"'''\" in in_docstring[0]:\n",
    "        in_docstring[0] = in_docstring[0].replace('\"\"\"', \"\").replace(\"'''\", \"\").strip()\n",
    "    request = \"Write a Python function which follows this instruction: \" + \" \".join([p.strip() for p in in_docstring])\n",
    "\n",
    "    # Extract solution\n",
    "    before_docstring = get_before(prompt_lines, docstring_start)\n",
    "    after_docstring = sample[\"canonical_solution\"].splitlines()\n",
    "    solution = before_docstring + after_docstring\n",
    "    # Gets rid of consecutive empty lines\n",
    "    solution = [v for i, v in enumerate(solution) if v != \"\" or v != solution[i - 1]]\n",
    "    solution = \"\\n\".join(solution)\n",
    "\n",
    "    return request, solution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def process_file(file_path: Path, out_path: Path, source: str):\n",
    "    lines = file_path.read_text().splitlines()\n",
    "    samples = list(map(json.loads, lines))\n",
    "\n",
    "    output = []\n",
    "    for sample in samples:\n",
    "        prompt, solution = get_request_and_solution(sample)\n",
    "        output.append({\"INSTRUCTION\": prompt, \"RESPONSE\": solution, \"SOURCE\": source})\n",
    "\n",
    "    with open(out_path, \"w\") as f:\n",
    "        for sample in output:\n",
    "            f.write(json.dumps(sample))\n",
    "            f.write(\"\\n\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "for file_path, out_path, source in zip(FILE_PATHS, OUT_PATHS, DATA_SOURCES):\n",
    "    process_file(file_path, out_path, source)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Display a sample output from HumanEval"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Prompt\n",
      "Write a Python function which follows this instruction: Check if in given list of numbers, are any two numbers closer to each other than given threshold.\n",
      "\n",
      "Solution\n",
      "from typing import List\n",
      "\n",
      "def has_close_elements(numbers: List[float], threshold: float) -> bool:\n",
      "    for idx, elem in enumerate(numbers):\n",
      "        for idx2, elem2 in enumerate(numbers):\n",
      "            if idx != idx2:\n",
      "                distance = abs(elem - elem2)\n",
      "                if distance < threshold:\n",
      "                    return True\n",
      "\n",
      "    return False\n"
     ]
    }
   ],
   "source": [
    "sample = json.loads(OUT_PATHS[0].read_text().splitlines()[0])\n",
    "\n",
    "print(\"Prompt\")\n",
    "print(sample[\"INSTRUCTION\"])\n",
    "print()\n",
    "print(\"Solution\")\n",
    "print(sample[\"RESPONSE\"])"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Display a sample output from MBPP"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Prompt\n",
      "Write a Python function which follows this instruction:  Write a function to find the shared elements from the given two lists.\n",
      "\n",
      "Solution\n",
      "def similar_elements(test_tup1, test_tup2):\n",
      "  res = tuple(set(test_tup1) & set(test_tup2))\n",
      "  return (res) \n"
     ]
    }
   ],
   "source": [
    "sample = json.loads(OUT_PATHS[1].read_text().splitlines()[0])\n",
    "\n",
    "print(\"Prompt\")\n",
    "print(sample[\"INSTRUCTION\"])\n",
    "print()\n",
    "print(\"Solution\")\n",
    "print(sample[\"RESPONSE\"])"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Upload to HuggingFace"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from datasets import Dataset\n",
    "\n",
    "humaneval_mbpp_codegen_qa_ds = Dataset.from_json([str(p) for p in OUT_PATHS])\n",
    "humaneval_mbpp_codegen_qa_ds.push_to_hub(\"OllieStanley/humaneval-mbpp-codegen-qa\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "oa-env",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.8"
  },
  "orig_nbformat": 4,
  "vscode": {
   "interpreter": {
    "hash": "329db72e1bc5b877a83eb22fb38cb9ecd67b294bcd84477fe23b24f012cf9e60"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
